Skip to content

feat: add ScalaUDF support via a codegen dispatcher#4267

Draft
mbutrovich wants to merge 52 commits into
apache:mainfrom
mbutrovich:codegen_scala_udf
Draft

feat: add ScalaUDF support via a codegen dispatcher#4267
mbutrovich wants to merge 52 commits into
apache:mainfrom
mbutrovich:codegen_scala_udf

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented May 8, 2026

Which issue does this PR close?

Closes #.

Rationale for this change

#4232 merged the JVM UDF bridge. This PR adds a codegen dispatcher on top: a CometUDF (CometScalaUDFCodegen) that compiles a specialized batch kernel per bound ScalaUDF expression and input schema via Janino. Without this path, any plan containing a ScalaUDF falls back to Spark for the enclosing operator, losing native execution on the surrounding plan.

The dispatcher is one of potentially many CometUDF implementations the bridge can route to. Hand-written CometUDFs for specific expression families (e.g. regex in #4239, JSON in #4305) remain a parallel path; the bridge dispatches by class name from the proto and does not require everything to go through the dispatcher.

Benefits:

  • Any ScalaUDF whose argument and return types are in the supported surface routes through native without a hand-written CometUDF.
  • The dispatcher binds the entire ScalaUDF argument tree, so Catalyst sub-expressions inside the UDF (upper(s), concat(c1, c2), monotonically_increasing_id()) compile into the same per-row loop as the user function.
  • Surrounding native operators stay native; the UDF is no longer a whole-operator fallback boundary.

Opt-in via spark.comet.exec.scalaUDF.codegen.enabled (default true). When disabled, plans containing a ScalaUDF fall back to Spark for that operator.

The CometUDF contract loosens from "should be stateless" to "may hold per-task state in fields." One instance per Spark task attempt per class, reused across all batches of the task, dropped on task completion. Per-instance access is single-threaded because Spark runs one native future per partition and Tokio polls one future per worker at a time.

What changes are included in this PR?

  • Generic codegen infrastructure under org.apache.comet.codegen: CometBatchKernelCodegen (orchestrator) + CometBatchKernelCodegenInput / CometBatchKernelCodegenOutput (per-side emission) + CometBatchKernel Java base + CometInternalRow / CometArrayData / CometMapData shim bases + CometSpecializedGettersDispatch for shared get(ordinal, dataType) dispatch. The framework is generic over Catalyst expressions; today's only consumer is the ScalaUDF dispatcher.
  • ScalaUDF dispatcher under org.apache.comet.udf.codegen: CometScalaUDFCodegen (bridge entry, compile cache, per-partition kernel state).
  • Complex type support: ArrayType, StructType, and MapType as both input and output, including arbitrary nesting. Sealed ArrowColumnSpec plus recursive nested-class emission.
  • Optimization set applied per (expression, input schema): zero-copy UTF8 reads on VarCharVector, non-nullable isNullAt elision, decimal short-precision fast path on both sides, UTF8 on-heap write shortcut, pre-sized variable-length output buffers, NullIntolerant short-circuit, non-nullable output short-circuit, nullable-element elision on array / map writes, subexpression elimination. Complex-type output writes hoist getChildByOrdinal and cast to once-per-batch setup so the per-row body has no runtime type dispatch and no redundant casts. In-code TODOs flag three further optimizations the input side has and the output side does not yet (UTF8 inline-unsafe write, cached write-buffer addresses, nested var-width sizing).
  • Bridge instance cache: ConcurrentHashMap<Long, ConcurrentHashMap<String, CometUDF>> keyed by (taskAttemptId, className) with a TaskCompletionListener evicting the per-task entry. Invariant to Tokio work-stealing across batches: a task that migrates between workers still sees the same instance. Assertions on every invariant (single listener registration, non-null cache, reflective-instantiate success, TaskContext install effect).
  • Serde routing: CometScalaUDF routes any ScalaUDF whose tree passes CometBatchKernelCodegen.canHandle. Proto build is inlined; no other expressions adopt the dispatcher in this PR.
  • Allocation reuses Utils.toArrowField and Field.createVector for every output type. Input spec derives Spark DataTypes via Utils.fromArrowField. Exception paths close partially allocated vectors to avoid leaks. The Arrow Field is computed once per (expression, schema) cache entry rather than per batch.
  • User guide page docs/source/user-guide/latest/jvm_udf_dispatch.md covers the on/off config, supported and unsupported types, behavior notes, and the cross-query recompile caveat. Architecture lives in Scaladoc on CometScalaUDFCodegen and CometBatchKernelCodegen; in-code TODOs carry the open items.

How are these changes tested?

  • CometCodegenSourceSuite: generated-source assertions for every optimization and every complex-type shape.
  • CometCodegenDispatchSmokeSuite: end-to-end correctness across the scalar and complex type surface (primitives, binary input and output, decimal precision boundaries, date / timestamp / timestampNTZ, array / struct / map round-trips including nested shapes and primitive-keyed maps), composed UDF trees, subquery reuse, TaskContext propagation, per-task cache isolation across sequential runs, kernel-cache reuse across batches of one query, ScalaUDF as a child of a native Spark expression.
  • CometCodegenDispatchFuzzSuite: randomized decimal identity fuzz at several null densities.
  • CometScalaUDFCompositionBenchmark: Spark vs Comet with the dispatcher enabled vs disabled, over three composed-UDF shapes.

@mbutrovich
Copy link
Copy Markdown
Contributor Author

There are like 4 Spark SQL test failures that look like they might need updating, but otherwise it's looking good. Not gonna worry about them until we discuss moving forward.

Comment thread common/src/main/java/org/apache/comet/codegen/CometBatchKernel.java
Comment thread common/src/main/scala/org/apache/comet/udf/CometUDF.scala
Comment thread common/src/main/scala/org/apache/comet/CometConf.scala Outdated
Comment thread docs/source/user-guide/latest/compatibility/regex.md
Comment thread spark/src/main/scala/org/apache/comet/CometExecIterator.scala
mbutrovich and others added 6 commits May 14, 2026 13:06
# Conflicts:
#	common/src/main/java/org/apache/comet/udf/CometUdfBridge.java
#	common/src/main/scala/org/apache/comet/udf/CometUDF.scala
#	docs/source/contributor-guide/index.md
#	native/core/src/execution/jni_api.rs
#	native/core/src/execution/planner.rs
#	native/spark-expr/src/jvm_udf/mod.rs
#	spark/src/main/scala/org/apache/comet/CometExecIterator.scala
#	spark/src/main/scala/org/apache/comet/serde/strings.scala
@mbutrovich mbutrovich changed the title feat: Arrow-direct codegen dispatcher for Spark expressions and Scala UDFs feat: add ScalaUDF support via a codegen dispatcher May 14, 2026
@mbutrovich mbutrovich moved this from Todo to In progress in Comet Development May 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

2 participants